package com.isyscore.os.etl.model;


import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.isyscore.boot.login.LoginUserManagerImpl;
import com.isyscore.os.core.model.entity.JobConfig;
import com.isyscore.os.core.model.entity.JobRunLog;
import com.isyscore.os.core.util.ApplicationUtils;
import com.isyscore.os.etl.model.dto.ExceptionDTO;
import com.isyscore.os.etl.model.dto.JobStandaloneInfoDTO;
import com.isyscore.os.etl.model.enums.DeployModeEnum;
import com.isyscore.os.etl.model.enums.JobConfigStatus;
import com.isyscore.os.etl.model.enums.JobRunStatus;
import com.isyscore.os.etl.service.JobRunLogService;
import com.isyscore.os.etl.service.impl.JobConfigServiceImpl;
import com.isyscore.os.etl.utils.CommandUtils;
import com.isyscore.os.permission.entity.LoginVO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


/**
 * @author codezzz
 * @Description:
 * @date 2021/9/1 14:09
 */

@Slf4j
public class StatusAndLogHandler implements Runnable{


    private final StringBuffer logAppender;

    private Integer retry = 5;

    private final String jobRunLogId;

    private LoginVO current;

    /**
     * jobid
     */
    private String appid;

    private final JobConfig config;

    private static final JobConfigServiceImpl jobConfigService = ApplicationUtils.getBean(JobConfigServiceImpl.class);

    private static final JobRunLogService jobRunLogService = ApplicationUtils.getBean(JobRunLogService.class);

    private static final String LOCAL_FLINK_HTTP_ADDRESS =
            ApplicationUtils.getProperty("flink-config.host") +  ":" + ApplicationUtils.getProperty("flink-config.port");

    public StatusAndLogHandler(StringBuffer logAppender, String appid, JobConfig jobConfig, String jobRunLogId, LoginVO current) {
        this.logAppender = logAppender;
        this.appid = appid;
        this.config = jobConfig;
        this.jobRunLogId = jobRunLogId;
        this.current = current;
    }


    @SneakyThrows
    @Override
    public void run() {
        LoginUserManagerImpl loginUserManager = ApplicationUtils.getBean(LoginUserManagerImpl.class);
        loginUserManager.setThreadLocalLoginUser(this.current);
        String flinkHttpAddress = LOCAL_FLINK_HTTP_ADDRESS;

        if (!DeployModeEnum.LOCAL.name().equals(config.getDeployMode())) {
            flinkHttpAddress = CommandUtils.parseFlinkHttpAddress(config.getFlinkRunConfig());
        }

        JobRunLog runLog = jobRunLogService.getById(jobRunLogId);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setId(config.getId());
        log.info("{}开始同步job#{}的状态：", Thread.currentThread().getName() ,appid);
        JobStandaloneInfoDTO jobStandaloneInfoDTO;
        int countDown = 3;
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            jobStandaloneInfoDTO = jobConfigService.getJobInfoForStandaloneByAppId(appid,
                    flinkHttpAddress);
            if (jobStandaloneInfoDTO == null || jobStandaloneInfoDTO.getErrors() != null || StringUtils.isEmpty(jobStandaloneInfoDTO.getState())) {
                //flink侧查询不到数据，有以下情况:
                //1. 过期清除
                //2. 手动删除
                if (jobStandaloneInfoDTO == null && countDown > 0) {
                    countDown--;
                    continue;
                }
                log.error("查询flink端任务信息出错:{}", (jobStandaloneInfoDTO == null ? "查询不到任务信息" : jobStandaloneInfoDTO.getErrors()));
                //更新job信息
                jobConfig.setStatus(JobConfigStatus.FAILED.getCode());
                jobConfig.setJobId("");
                jobConfig.setEditor("sys_auto");
                jobConfigService.updateById(jobConfig);
                //更新日志信息
                runLog.setJobStatus(JobRunStatus.FAILED.name());
                runLog.setJobId("");
                logAppender.append("查询flink端任务信息出错:").append(jobStandaloneInfoDTO == null ? "查询不到 flink 侧相关信息" : jobStandaloneInfoDTO.getErrors());
                runLog.setLocalLog(logAppender.toString());
                runLog.setEndTime(LocalDateTime.now());
                runLog.setUpdatedAt(LocalDateTime.now());
                jobRunLogService.updateById(runLog);
                return;
            }
            // 获取flink端任务状态
            String statusOnFlink = jobStandaloneInfoDTO.getState();
            //终止状态，更新数据库即可
            if (Objects.equals(statusOnFlink, JobRunStatus.FAILED.name())) {
                log.error("flink 侧运行任务失败，失败原因：{}", jobStandaloneInfoDTO.getErrors());
                runLog.setJobStatus(JobRunStatus.FAILED.name());
                //2022.01.14 获取flink端异常信息
                ExceptionDTO exception = JSONObject.parseObject(jobConfigService.getExceptions(appid, flinkHttpAddress), ExceptionDTO.class);
                String res = "";
                if (exception != null) {
                    res = exception.getAllExceptions().parallelStream().map(e -> (String)e.get("exception")).collect(Collectors.joining("\n"));
                }
                logAppender.append("flink 侧运行任务失败，失败原因：" + res);
                runLog.setLocalLog(logAppender.toString());
                runLog.setEndTime(LocalDateTime.now());
                runLog.setUpdatedAt(LocalDateTime.now());
                jobConfig.setStatus(JobConfigStatus.FAILED.getCode());
                jobConfigService.updateById(jobConfig);
                jobRunLogService.updateById(runLog);
                return;
            }else if (Objects.equals(statusOnFlink, JobRunStatus.CANCELED.name())) {
                log.info("flink 侧运行任务取消");
                logAppender.append("任务被取消: " + LocalDateTime.now());
                runLog.setJobStatus(JobRunStatus.FAILED.name());
                runLog.setLocalLog(logAppender.toString());
                runLog.setEndTime(LocalDateTime.now());
                runLog.setUpdatedAt(LocalDateTime.now());
                jobConfig.setStatus(JobConfigStatus.CANCELED.getCode());
                jobConfigService.updateById(jobConfig);
                jobRunLogService.updateById(runLog);
                return;
            } else if (Objects.equals(statusOnFlink, JobRunStatus.FINISHED.name())) {
                log.info("flink 侧运行任务完成:{}", LocalDateTime.now());
                logAppender.append("任务完成：" + LocalDateTime.now());
                runLog.setLocalLog(logAppender.toString());
                runLog.setJobStatus(JobRunStatus.SUCCESS.name());
                runLog.setEndTime(LocalDateTime.now());
                runLog.setUpdatedAt(LocalDateTime.now());
                jobConfig.setStatus(JobConfigStatus.FINISHED.getCode());
                jobConfigService.updateById(jobConfig);
                jobRunLogService.updateById(runLog);
                return;
            }else if (Objects.equals(statusOnFlink, JobRunStatus.RUNNING.name())) {
                runLog.setJobStatus(JobRunStatus.RUNNING.name());
                runLog.setUpdatedAt(LocalDateTime.now());
                jobRunLogService.updateById(runLog);
            }
            if (StrUtil.equalsAny(statusOnFlink,JobRunStatus.INITIALIZING.name(),JobRunStatus.RESTARTING.name(),JobRunStatus.CREATED.name())) {
                if (retry <= 0) {
                    runLog.setJobStatus(JobRunStatus.FAILED.name());
                    runLog.setLocalLog(logAppender.toString());
                    runLog.setEndTime(LocalDateTime.now());
                    runLog.setUpdatedAt(LocalDateTime.now());
                    jobConfig.setStatus(JobConfigStatus.FAILED.getCode());
                    jobConfigService.updateById(jobConfig);
                    jobRunLogService.updateById(runLog);
                    //不仅仅是更改状态，flink侧也要停止掉
                    jobConfigService.cancelJobForFlinkByAppId(jobConfig);
                    return;
                }
                logAppender.append("flink 侧任务正在：" + statusOnFlink + ", 重试中。。\n");
                retry--;
            }
        }
    }

}
